-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: AsyncPipeline that can schedule components to run concurrently #8812
Conversation
@Amnah199 @davidsbatista much smaller diff now that the other PR is merged. This is largely the same as the PR that we already merged to experimental with the following differences:
|
@@ -23,6 +23,7 @@ | |||
"default_to_dict", | |||
"DeserializationError", | |||
"ComponentError", | |||
"AsyncPipeline", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) suggestion: keeping these imports ordered alphabetically helps locate something as the list grows
__all__ = [
"Answer",
"AsyncPipeline",
"ComponentError",
"DeserializationError",
"Document",
"ExtractedAnswer",
"GeneratedAnswer",
"Pipeline",
"PredefinedPipeline",
"component",
"default_from_dict",
"default_to_dict",
]
I did another quick review, although most of this was already reviewed before From my side it's approved, but to play safe, let's wait for Amna to also do another quick review before merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: Amna Mubashar <[email protected]>
Co-authored-by: Amna Mubashar <[email protected]>
async_loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(async_loop) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, we can avoid manual handling of loops by using asyncio.run
, if you feel that would be better.
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | ||
pp = AsyncPipeline() | ||
pp.add_component("wait", waiting_component()) | ||
|
||
run_data = [{"wait_for": 1}, {"wait_for": 2}] | ||
|
||
async_loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(async_loop) | ||
|
||
async def run_all(): | ||
# Create concurrent tasks for each pipeline run | ||
tasks = [pp.run_async(data) for data in run_data] | ||
await asyncio.gather(*tasks) | ||
|
||
try: | ||
async_loop.run_until_complete(run_all()) | ||
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"] | ||
for span in component_spans: | ||
assert span.tags["haystack.component.visits"] == 1 | ||
finally: | ||
async_loop.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this? (although I didnt test it)
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | |
pp = AsyncPipeline() | |
pp.add_component("wait", waiting_component()) | |
run_data = [{"wait_for": 1}, {"wait_for": 2}] | |
async_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(async_loop) | |
async def run_all(): | |
# Create concurrent tasks for each pipeline run | |
tasks = [pp.run_async(data) for data in run_data] | |
await asyncio.gather(*tasks) | |
try: | |
async_loop.run_until_complete(run_all()) | |
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"] | |
for span in component_spans: | |
assert span.tags["haystack.component.visits"] == 1 | |
finally: | |
async_loop.close() | |
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | |
""" | |
Test that the AsyncPipeline can execute multiple runs concurrently and that | |
each component is called exactly once per run (as indicated by the 'visits' tag). | |
""" | |
async_pipeline = AsyncPipeline() | |
async_pipeline.add_component("wait", waiting_component()) | |
run_data = [{"wait_for": 1}, {"wait_for": 2}] | |
async def run_all(): | |
tasks = [async_pipeline.run_async(data) for data in run_data] | |
await asyncio.gather(*tasks) | |
component_spans = [ | |
sp for sp in spying_tracer.spans | |
if sp.operation_name == "haystack.component.run_async" | |
] | |
for span in component_spans: | |
expected_visits = 1 | |
actual_visits = span.tags.get("haystack.component.visits") | |
assert actual_visits == expected_visits, ( | |
f"Expected {expected_visits} visit, got {actual_visits} for span {span}" | |
) | |
# Use asyncio.run to manage the event loop. | |
asyncio.run(run_all()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks again @mathislucka.
Much appreciated!
Excellent work @mathislucka 🚀! I've waited a lot for this feature! I hope also the documentation will be updated soon. import asyncio
from haystack import AsyncPipeline
from haystack import component
from datetime import datetime
def print_with_prefix(pipeline_prefix: str, component_name: str, message: str):
"""Prints a message prefixed with the pipeline and component name."""
print(f"[{pipeline_prefix}] {component_name} {message} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
async def async_sleep_task(duration: int):
await asyncio.sleep(duration)
@component
class ComponentA:
@component.output_types(A_output=str)
def run(self, dummy: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentA", "run started")
result = {"A_output": f"Processed by A: {dummy}"}
print_with_prefix(prefix, "ComponentA", "run ended")
return result
@component.output_types(A_output=str)
async def run_async(self, dummy: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentA", "run_async started")
await async_sleep_task(3)
result = {"A_output": f"Processed by A: {dummy}"}
print_with_prefix(prefix, "ComponentA", "run_async ended")
return result
@component
class ComponentB:
@component.output_types(B_output=str)
def run(self, dummy: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentB", "run started")
result = {"B_output": f"Processed by B: {dummy}"}
print_with_prefix(prefix, "ComponentB", "run ended")
return result
@component.output_types(B_output=str)
async def run_async(self, dummy: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentB", "run_async started")
await async_sleep_task(2)
result = {"B_output": f"Processed by B: {dummy}"}
print_with_prefix(prefix, "ComponentB", "run_async ended")
return result
@component
class ComponentC:
@component.output_types(C_output=str)
def run(self, A_output: str, B_output: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentC", "run started")
result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
print_with_prefix(prefix, "ComponentC", "run ended")
return result
@component.output_types(C_output=str)
async def run_async(self, A_output: str, B_output: str) -> dict:
prefix = getattr(self, 'pipeline_name', 'Unknown')
print_with_prefix(prefix, "ComponentC", "run_async started")
await async_sleep_task(1)
result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
print_with_prefix(prefix, "ComponentC", "run_async ended")
return result
def create_pipeline(name: str):
pipeline = AsyncPipeline()
pipeline.name = name
comp_a = ComponentA()
comp_b = ComponentB()
comp_c = ComponentC()
comp_a.pipeline_name = name
comp_b.pipeline_name = name
comp_c.pipeline_name = name
pipeline.add_component("A", comp_a)
pipeline.add_component("B", comp_b)
pipeline.add_component("C", comp_c)
pipeline.connect("A.A_output", "C.A_output")
pipeline.connect("B.B_output", "C.B_output")
return pipeline
if __name__ == "__main__":
async def run_pipeline(pipeline, input_data):
output = await pipeline.run_async(input_data)
print(f"[{pipeline.name}] Pipeline output: {output}")
async def main():
input_data1 = {"dummy": "Test data 1"}
input_data2 = {"dummy": "Test data 2"}
pipeline1 = create_pipeline("P1")
pipeline2 = create_pipeline("P2")
# Run both pipelines concurrently.
task1 = asyncio.create_task(run_pipeline(pipeline1, input_data1))
task2 = asyncio.create_task(run_pipeline(pipeline2, input_data2))
await asyncio.gather(task1, task2)
asyncio.run(main()) |
…eepset-ai#8812) * add component checks * pipeline should run deterministically * add FIFOQueue * add agent tests * add order dependent tests * run new tests * remove code that is not needed * test: intermediate from cycle outputs are available outside cycle * add tests for component checks (Claude) * adapt tests for component checks (o1 review) * chore: format * remove tests that aren't needed anymore * add _calculate_priority tests * revert accidental change in pyproject.toml * test format conversion * adapt to naming convention * chore: proper docstrings and type hints for PQ * format * add more unit tests * rm unneeded comments * test input consumption * lint * fix: docstrings * lint * format * format * fix license header * fix license header * add component run tests * fix: pass correct input format to tracing * fix types * format * format * types * add defaults from Socket instead of signature - otherwise components with dynamic inputs would fail * fix test names * still wait for optional inputs on greedy variadic sockets - mirrors previous behavior * fix format * wip: warn for ambiguous running order * wip: alternative warning * fix license header * make code more readable Co-authored-by: Amna Mubashar <[email protected]> * Introduce content tracing to a behavioral test * Fixing linting * Remove debug print statements * Fix tracer tests * remove print * test: test for component inputs * test: remove testing for run order * chore: update component checks from experimental * chore: update pipeline and base from experimental * refactor: remove unused method * refactor: remove unused method * refactor: outdated comment * refactor: inputs state is updated as side effect - to prepare for AsyncPipeline implementation * format * test: add file conversion test * format * fix: original implementation deepcopies outputs * lint * fix: from_dict was updated * fix: format * fix: test * test: add test for thread safety * remove unused imports * format * test: FIFOPriorityQueue * chore: add release note * feat: add AsyncPipeline * chore: Add release notes * fix: format * debug: switch run order to debug ubuntu and windows tests * fix: consider priorities of other components while waiting for DEFER * refactor: simplify code * fix: resolve merge conflict with mermaid changes * fix: format * fix: remove unused import * refactor: rename to avoid accidental conflicts * fix: track pipeline type * fix: and extend test * fix: format * style: sort alphabetically * Update test/core/pipeline/features/conftest.py Co-authored-by: Amna Mubashar <[email protected]> * Update test/core/pipeline/features/conftest.py Co-authored-by: Amna Mubashar <[email protected]> * Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml * fix: indentation, do not close loop * fix: use asyncio.run * fix: format --------- Co-authored-by: Amna Mubashar <[email protected]> Co-authored-by: David S. Batista <[email protected]>
…eepset-ai#8812) * add component checks * pipeline should run deterministically * add FIFOQueue * add agent tests * add order dependent tests * run new tests * remove code that is not needed * test: intermediate from cycle outputs are available outside cycle * add tests for component checks (Claude) * adapt tests for component checks (o1 review) * chore: format * remove tests that aren't needed anymore * add _calculate_priority tests * revert accidental change in pyproject.toml * test format conversion * adapt to naming convention * chore: proper docstrings and type hints for PQ * format * add more unit tests * rm unneeded comments * test input consumption * lint * fix: docstrings * lint * format * format * fix license header * fix license header * add component run tests * fix: pass correct input format to tracing * fix types * format * format * types * add defaults from Socket instead of signature - otherwise components with dynamic inputs would fail * fix test names * still wait for optional inputs on greedy variadic sockets - mirrors previous behavior * fix format * wip: warn for ambiguous running order * wip: alternative warning * fix license header * make code more readable Co-authored-by: Amna Mubashar <[email protected]> * Introduce content tracing to a behavioral test * Fixing linting * Remove debug print statements * Fix tracer tests * remove print * test: test for component inputs * test: remove testing for run order * chore: update component checks from experimental * chore: update pipeline and base from experimental * refactor: remove unused method * refactor: remove unused method * refactor: outdated comment * refactor: inputs state is updated as side effect - to prepare for AsyncPipeline implementation * format * test: add file conversion test * format * fix: original implementation deepcopies outputs * lint * fix: from_dict was updated * fix: format * fix: test * test: add test for thread safety * remove unused imports * format * test: FIFOPriorityQueue * chore: add release note * feat: add AsyncPipeline * chore: Add release notes * fix: format * debug: switch run order to debug ubuntu and windows tests * fix: consider priorities of other components while waiting for DEFER * refactor: simplify code * fix: resolve merge conflict with mermaid changes * fix: format * fix: remove unused import * refactor: rename to avoid accidental conflicts * fix: track pipeline type * fix: and extend test * fix: format * style: sort alphabetically * Update test/core/pipeline/features/conftest.py Co-authored-by: Amna Mubashar <[email protected]> * Update test/core/pipeline/features/conftest.py Co-authored-by: Amna Mubashar <[email protected]> * Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml * fix: indentation, do not close loop * fix: use asyncio.run * fix: format --------- Co-authored-by: Amna Mubashar <[email protected]> Co-authored-by: David S. Batista <[email protected]>
Thanks @alex-stoica ! Yes, we will have updated documentation for the 2.10 release. I'm glad that you find it useful and that it works as expected. Since this is a quite complex feature, please let me know if you find anything that doesn't work as you would expect. |
Related Issues
Proposed Changes:
Implements an AsyncPipeline that supports:
run
-method with concurrent execution of componentsHow did you test it?
Notes for the reviewer
Review after #8707
Code was reviewed here before: deepset-ai/haystack-experimental#180
Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
and added!
in case the PR includes breaking changes.